package cn.stylefeng.roses.kernel.sync.modular.ew;

import cn.stylefeng.roses.kernel.sync.core.util.CustomSpringContextHolder;
import cn.stylefeng.roses.kernel.sync.modular.ew.base.AbstractEntryWrapper;
import cn.stylefeng.roses.kernel.sync.modular.ra.AbstractRowDataAction;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import lombok.extern.slf4j.Slf4j;

import java.util.List;

/**
 * insert/update/delete相关操作的的entry的处理者
 *
 * @author fengshuonan
 * @date 2019-01-16-7:12 PM
 */
@Slf4j
public class RowDataEntryWrapper implements AbstractEntryWrapper {

    @Override
    public void processEntrys(List<CanalEntry.Entry> entrys) {

        List<AbstractRowDataAction> rowDataProcessors = CustomSpringContextHolder.getBeanOfType(AbstractRowDataAction.class);
        if (rowDataProcessors == null || rowDataProcessors.size() == 0) {
            return;
        }

        for (CanalEntry.Entry entry : entrys) {

            //如果是具体insert/update/delete等类型变更数据
            if (entry.getEntryType() == EntryType.ROWDATA) {
                RowChange rowChage = null;
                try {
                    rowChage = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                }

                //获取操作类型
                CanalEntry.EventType eventType = rowChage.getEventType();

                //是否是ddl变更操作，比如create table/drop table
                if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
                    continue;
                }

                CanalEntry.Header header = entry.getHeader();

                //数据库名称
                String schemaName = header.getSchemaName();

                //表名称
                String tableName = header.getTableName();

                for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                    for (AbstractRowDataAction rowDataProcessor : rowDataProcessors) {
                        if (rowDataProcessor.matches(schemaName, tableName, eventType)) {
                            rowDataProcessor.doExecute(rowData);
                        }
                    }
                }
            }
        }
    }
}
